import logging
from typing import Literal

from werkzeug import Request as WerkzeugRequest

from localstack.aws.api import CommonServiceException, ServiceException
from localstack.aws.api.sqs import (
    Message,
    QueueAttributeName,
    QueueDoesNotExist,
    ReceiveMessageResult,
)
from localstack.aws.protocol.parser import create_parser
from localstack.aws.protocol.serializer import aws_response_serializer
from localstack.aws.spec import load_service
from localstack.http import Request, route
from localstack.services.sqs.models import (
    FifoQueue,
    SqsMessage,
    SqsQueue,
    StandardQueue,
    sqs_stores,
    to_sqs_api_message,
)
from localstack.services.sqs.utils import (
    parse_queue_url,
)
from localstack.utils.aws.request_context import extract_region_from_headers

LOG = logging.getLogger(__name__)


class InvalidAddress(ServiceException):
    code = "InvalidAddress"
    message = "The address https://queue.amazonaws.com/ is not valid for this endpoint."
    sender_fault = True
    status_code = 404


def get_sqs_protocol(request: Request) -> Literal["query", "json"]:
    content_type = request.headers.get("Content-Type")
    return "json" if content_type == "application/x-amz-json-1.0" else "query"


def sqs_auto_protocol_aws_response_serializer(service_name: str, operation: str):
    def _decorate(fn):
        def _proxy(*args, **kwargs):
            # extract request from function invocation (decorator can be used for methods as well as for functions).
            if len(args) > 0 and isinstance(args[0], WerkzeugRequest):
                # function
                request = args[0]
            elif len(args) > 1 and isinstance(args[1], WerkzeugRequest):
                # method (arg[0] == self)
                request = args[1]
            elif "request" in kwargs:
                request = kwargs["request"]
            else:
                raise ValueError(f"could not find Request in signature of function {fn}")

            protocol = get_sqs_protocol(request)
            return aws_response_serializer(service_name, operation, protocol)(fn)(*args, **kwargs)

        return _proxy

    return _decorate


class SqsDeveloperApi:
    """
    A set of SQS developer tool endpoints:

    - ``/_aws/sqs/messages``: list SQS messages without side effects, compatible with ``ReceiveMessage``.
    """

    def __init__(self, stores=None):
        self.stores = stores or sqs_stores

    @route("/_aws/sqs/messages", methods=["GET", "POST"])
    @sqs_auto_protocol_aws_response_serializer("sqs", "ReceiveMessage")
    def list_messages(self, request: Request) -> ReceiveMessageResult:
        """
        This endpoint expects a ``QueueUrl`` request parameter (either as query arg or form parameter), similar to
        the ``ReceiveMessage`` operation. It will parse the Queue URL generated by one of the SQS endpoint strategies.
        """

        if "x-amz-" in request.mimetype or "x-www-form-urlencoded" in request.mimetype:
            # only parse the request using a parser if it comes from an AWS client
            protocol = get_sqs_protocol(request)
            operation, service_request = create_parser(
                load_service("sqs", protocol=protocol)
            ).parse(request)
            if operation.name != "ReceiveMessage":
                raise CommonServiceException(
                    "InvalidRequest", "This endpoint only accepts ReceiveMessage calls"
                )
        else:
            service_request = dict(request.values)

        if not service_request.get("QueueUrl"):
            raise QueueDoesNotExist()

        try:
            account_id, region, queue_name = parse_queue_url(service_request.get("QueueUrl"))
        except ValueError:
            LOG.error(
                "Error while parsing Queue URL from request values: %s",
                service_request.get,
                exc_info=LOG.isEnabledFor(logging.DEBUG),
            )
            raise InvalidAddress()

        if not region:
            region = extract_region_from_headers(request.headers)

        return self._get_and_serialize_messages(request, region, account_id, queue_name)

    @route("/_aws/sqs/messages/<region>/<account_id>/<queue_name>")
    @sqs_auto_protocol_aws_response_serializer("sqs", "ReceiveMessage")
    def list_messages_for_queue_url(
        self, request: Request, region: str, account_id: str, queue_name: str
    ) -> ReceiveMessageResult:
        """
        This endpoint extracts the region, account_id, and queue_name directly from the URL rather than requiring the
        QueueUrl as parameter.
        """
        return self._get_and_serialize_messages(request, region, account_id, queue_name)

    def _get_and_serialize_messages(
        self,
        request: Request,
        region: str,
        account_id: str,
        queue_name: str,
    ) -> ReceiveMessageResult:
        show_invisible = request.values.get("ShowInvisible", "").lower() in ["true", "1"]
        show_delayed = request.values.get("ShowDelayed", "").lower() in ["true", "1"]

        try:
            store = self.stores[account_id][region]
            queue = store.queues[queue_name]
        except KeyError:
            LOG.info(
                "no queue named %s in region %s and account %s", queue_name, region, account_id
            )
            raise QueueDoesNotExist()

        messages = self._collect_messages(
            queue, show_invisible=show_invisible, show_delayed=show_delayed
        )

        return ReceiveMessageResult(Messages=messages)

    def _collect_messages(
        self, queue: SqsQueue, show_invisible: bool = False, show_delayed: bool = False
    ) -> list[Message]:
        """
        Retrieves from a given SqsQueue all visible messages without causing any side effects (not setting any
        receive timestamps, receive counts, or visibility state).

        :param queue: the queue
        :param show_invisible: show invisible messages as well
        :param show_delayed: show delayed messages as well
        :return: a list of messages
        """
        receipt_handle = "SQS/BACKDOOR/ACCESS"  # dummy receipt handle

        sqs_messages: list[SqsMessage] = []

        if show_invisible:
            sqs_messages.extend(queue.inflight)

        if isinstance(queue, StandardQueue):
            sqs_messages.extend(queue.visible.queue)
        elif isinstance(queue, FifoQueue):
            if show_invisible:
                for inflight_group in queue.inflight_groups:
                    # messages that have been received are held in ``queue.inflight``, even for FIFO queues. however,
                    # for fifo queues, messages that are in the same message group as messages that have been
                    # received, are also considered invisible, and are held here in ``inflight_group.messages``.
                    for sqs_message in inflight_group.messages:
                        sqs_messages.append(sqs_message)

            for message_group in queue.message_group_queue.queue:
                # these are all messages of message groups that are visible
                for sqs_message in message_group.messages:
                    sqs_messages.append(sqs_message)
        else:
            raise ValueError(f"unknown queue type {type(queue)}")

        if show_delayed:
            sqs_messages.extend(queue.delayed)

        messages = []

        for sqs_message in sqs_messages:
            message: Message = to_sqs_api_message(sqs_message, [QueueAttributeName.All], ["All"])
            # these are all non-standard fields so we squelch the linter
            if show_invisible:
                message["Attributes"]["IsVisible"] = str(sqs_message.is_visible).lower()  # noqa
            if show_delayed:
                message["Attributes"]["IsDelayed"] = str(sqs_message.is_delayed).lower()  # noqa
            messages.append(message)
            message["ReceiptHandle"] = receipt_handle

        return messages
